#include "pipe_module.h"

// time function's header.
#if defined(_WIN32)
#include <windows.h>
#include "time.h"
#include <sys/timeb.h>
#include <direct.h>
#include <io.h>
#pragma warning(disable:4996)
#else
#include <unistd.h>
#include <sys/stat.h>
#include <time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/time.h>
#endif

namespace anet {
	namespace pipe {
		std::atomic_bool pipeLogOpen = true;

		// get time info: {second, millisecond}
		inline std::pair<time_t, int> getTimeInfo() {
        #if defined(_WIN32) || defined(_WIN64)
			struct timeb tp;
			ftime(&tp);
			return { tp.time,tp.millitm };
        #else
			struct timeval now;
			struct timezone zone;
			gettimeofday(&now, &zone);
			return { now.tv_sec,now.tv_usec / 1000 };
        #endif
		}

		// get current time.
		template <size_t N>
		inline const char* buildCurrentTime(char(&timeInfo)[N]) {
			auto timePair = getTimeInfo();
			auto ms = timePair.second;
			auto s = timePair.first;
			auto tm = localtime(&s);
			int n = std::snprintf(timeInfo, sizeof(timeInfo),
				"%d-%02d-%02d %02d:%02d:%02d.%03d",
				1900 + tm->tm_year, tm->tm_mon + 1, tm->tm_mday,
				tm->tm_hour, tm->tm_min, tm->tm_sec, ms
			);
			assert(n > 0 && n <= int(sizeof(timeInfo)));
			return timeInfo;
		}

		// output to stdout.
        #define printArgs(format) {         \
		    va_list args;                   \
		    va_start(args, format);         \
		    vfprintf(stdout, format, args); \
		    va_end(args);                   \
	    }

		// default implementation of ILog for internal log.
		class pipeDefaultLog : public IPipeLog {
		public:
			pipeDefaultLog() = default;
			virtual ~pipeDefaultLog() = default;

		public:
			virtual bool setLevel(int level) {
				return true;
			}
			virtual int getLevel() const {
				return 0;
			}
			virtual void Debug(const char* format, ...) override {
				if (!pipeLogOpen) {
					return;
				}

				char time[1024];
				buildCurrentTime(time);
				fprintf(stdout, "%s ", time);

				printArgs(format);
				fprintf(stdout, "%s", "\n");
			}
			virtual void Info(const char* format, ...) override {
				if (!pipeLogOpen) {
					return;
				}

				char time[1024];
				buildCurrentTime(time);
				fprintf(stdout, "%s ", time);

				printArgs(format);
				fprintf(stdout, "%s", "\n");
			}
			virtual void Warn(const char* format, ...) override {
				if (!pipeLogOpen) {
					return;
				}

				char time[1024];
				buildCurrentTime(time);
				fprintf(stdout, "%s ", time);

				printArgs(format);
				fprintf(stdout, "%s", "\n");
			}
			virtual void Crit(const char* format, ...) override {
				if (!pipeLogOpen) {
					return;
				}

				char time[1024];
				buildCurrentTime(time);
				fprintf(stdout, "%s ", time);

				printArgs(format);
				fprintf(stdout, "%s", "\n");
			}
		};
		/////////////////////////////////////////////////////////////////////////

		// declare a global default log.
		static pipeDefaultLog internalLog;
		IPipeLog* gLog = &internalLog;

		static const uint32 gTimeWheelSize = 63;
		static const uint32 gPublishInterval = 5 * 1000;

		// implementation global GetPipeModule().
		IPipeModule* GetPipeModule() {
			return &CPipeModule::instance();
		}

		CPipeModule::CPipeModule() : m_loop(1), 
			m_server(m_loop), m_pipeReporter(nullptr), 
			m_timeWheel(gTimeWheelSize) {
		}

		CPipeModule::~CPipeModule() {
			m_server.stop();
		}

		CPipeModule& CPipeModule::instance() {
			static CPipeModule gInstance;
			return gInstance;
		}

		bool CPipeModule::Run(int count) {
			bool ret = false;

			// anet CMainService::instance().run.
			if (anet::tcp::CMainService::instance().run(count)) {
				ret = true;
			}

			// timer run.
			m_timeWheel.update(count);

			// service discovery run.
			m_serviceDiscovery->Run(count);

			return ret;
		}

		uint32 CPipeModule::GetLocalId() const {
			return m_pipeConfig.GetServerId();
		}

		bool CPipeModule::Init(const char* pipePath, IPipeReporter* reporter) {
			if (reporter == nullptr) {
				return false;
			}

			if (!m_pipeConfig.Load(pipePath)) {
				return false;
			}
			return this->start(reporter);
		}

		IPipeReporter* CPipeModule::GetReporter() {
			return m_pipeReporter;
		}

		userSessionFactory* CPipeModule::GetSessionFactory() {
			return &m_sessionFactory;
		}

		bool CPipeModule::start(IPipeReporter* reporter) {
			if (reporter == nullptr) {
				return false;
			}
			m_pipeReporter = reporter;

			// try to listen on local ip and port if it is empty.
			const auto& listenIP = m_pipeConfig.GetListenIP();
			if (!listenIP.empty()) { // only it is not empty.
				m_server.setPacketParser(&m_codec);
				m_server.setSessionFactory(&m_sessionFactory);
				if (!m_server.start(m_pipeConfig.GetListenAddr())) {
					gLog->Crit("listen on:%s error", m_pipeConfig.GetListenAddr());
					return false;
				}
			}
			// initialize the service discovery module.
			return this->initServiceDiscovery();
		}

		bool CPipeModule::initServiceDiscovery() {
			// node name: name:id.
			auto name = m_pipeConfig.GetServerName();
			auto id = m_pipeConfig.GetServerId();
			std::string nodeInfo(name);
			nodeInfo += ":"+std::to_string(id);

			// node address.
			const auto& addr = m_pipeConfig.GetListenAddr();
			serviceDiscoveryFunc func = std::bind(&CPipeModule::onNewServiceDiscovery, this, std::placeholders::_1);
			m_serviceDiscovery = std::make_unique<serviceDiscoveryType>(nodeInfo, addr, "", func);

			// initialize discovery node.
			const auto& serviceDiscoveryInfo = m_pipeConfig.GetServiceDiscoveryInfo();
			const auto& ip = serviceDiscoveryInfo.ip;
			const auto& port = serviceDiscoveryInfo.port;
			const auto& passwd = serviceDiscoveryInfo.password;
			const auto& index = serviceDiscoveryInfo.index;
			const auto& key = serviceDiscoveryInfo.key;
			return m_serviceDiscovery->Init(ip, port, passwd, index, gPublishInterval, key);
		}

		bool CPipeModule::onNewServiceDiscovery(const serviceDiscoveryEntity* node) {
			if (node == nullptr) {
				return false;
			}
			if (node->name.empty()) {
				return false;
			}
			gLog->Debug("find new node(%s), my node is:%s", node->name.c_str(), m_pipeConfig.GetServerName().c_str());

			std::vector<std::string> vec;
			strSplits(node->name, ":", vec);
			if (vec.size() != 2) {
				gLog->Debug("invalid node name:%s, which must be like name:id", node->name.c_str());
				return true;
			}

			// node name and its id.
			const auto nodeName = vec[0];
			const auto nodeId = std::atoi(vec[1].c_str());
			if (m_pipeConfig.IsConnectTo(nodeName, nodeId)) {
				auto session = m_sessionFactory.createSession();
				if (session == nullptr) {
					gLog->Crit("create session return nullptr");
					return false;
				}

				auto pSharePtrSession = (sharePtrSession*)session;
				assert(pSharePtrSession != nullptr && "pipe session is nullptr");

				CPipeSession* pipeSession = pSharePtrSession->getProxySession();
				assert(pipeSession != nullptr && "pipe session is nullptr");

				// initialize the pipe's session.
				pipeSession->SetRemoteId(nodeId);
				pipeSession->SetRemoteName(nodeName);
				pipeSession->SetToken(m_pipeConfig.GetNodeToken(nodeName));
				pipeSession->SetMode(ePipeMode::pipe_mode_connector);

				// create client object.
				tcpClientPtr client = std::make_shared<tcpClient>(m_loop);
				pipeSession->SetClient(client);

				// initialize client's codec and session.
				client->setPacketParser(&m_codec);
				client->setSession(session);

				// try to connect to the remote server.
				if (!client->asyncConnect(node->endpoint,
					// reconnect to the server if there is an error.
					[client,this](const char* ip, unsigned short port, int err) {
					   (void)ip;(void)port;(void)err;
					   m_timeWheel.add_once_timer([client](void* pData) {
						   client->reAsyncConnect();
					   }, 5000);
					}
				)) {
					return false;
				}
				return true;
			}
			gLog->Debug("%s does not connect %s node",m_pipeConfig.GetServerName().c_str(), node->endpoint.c_str());
			return true;
		}

		timeWheelType& CPipeModule::GetTimeWheel() {
			return m_timeWheel;
		}

		bool CPipeModule::CheckConnectInfo(const std::string& name, const std::string& token) {
			return token == m_pipeConfig.GetNodeToken(name);
		}

		const std::string& CPipeModule::GetLocalName() const {
			return m_pipeConfig.GetServerName();
		}

		void CPipeModule::RemoveService(const std::string& name) {
			m_serviceDiscovery->RemoveService(name);
		}

		CPipeConfig* CPipeModule::GetPipeConfig() {
			return &m_pipeConfig;
		}
	}
}
